package com.shujia.spark.streaming

import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Date

object Demo8Card {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName("ds")
      .master("local[2]")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))


    /**
     * 读取卡口过车数据
     */
    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    /**
     * 1、解析json格式的数据
     *
     */
    val cardAndSpeedDS: DStream[(Long, (Double, Int))] = linesDS.map(line => {
      //使用fastjson工具解析json数据
      val carJson: JSONObject = JSON.parseObject(line)
      //取出卡口编号和车速
      val card: Long = carJson.getLong("card")
      val speed: Double = carJson.getDouble("speed")
      (card, (speed, 1))
    })

    /**
     * 2、实时统计每隔卡口的平均车速，和车的数量
     * 统计最近15秒的车辆，每隔5秒统计一次
     *
     */

    val sumSpeedAndNUmDS: DStream[(Long, (Double, Int))] = cardAndSpeedDS
      .reduceByKeyAndWindow((kv1: (Double, Int), kv2: (Double, Int)) => {
        //计算总的测试
        val sumSpeed: Double = kv1._1 + kv2._1
        //计算车的数量
        val num: Int = kv1._2 + kv2._2
        (sumSpeed, num)
      }, Durations.seconds(15), Durations.seconds(5))

    /**
     * 3、计算平均车速
     *
     */
    val avgSpeedAndNumDs: DStream[(Long, Int, Double)] = sumSpeedAndNUmDS.map {
      case (card: Long, (sumSpeed: Double, num: Int)) =>
        val avgSpeed: Double = sumSpeed / num
        (card, num, avgSpeed)
    }

    /**
     * 4、将统计的结果保存到mysql中
     *
     * 将数据保存到数据库存在的问题
     * 1、如果直接使用foreach。会为每一条数据创建一个链接，效率低，而且会导致数据库压力过大
     * 2、如果将网络链接放在foreach算子的外面，会报错，  网络链接不能再网络中传输
     *
     * 正确写法
     * 使用foreachPartition，只会为每一个分区创建一个数据库链接
     *
     * rdd的foreach和foreachPartition
     * foreach一次处理一条数据
     * foreachPartition： 一次处理一个分区的数据
     *
     */

    avgSpeedAndNumDs.foreachRDD(rdd => {
      rdd.foreachPartition(iter => {
        //获取统计的时间
        val date = new Date()
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val comDate: String = format.format(date)

        //1、加载驱动
        Class.forName("com.mysql.jdbc.Driver")
        //2、创建链接
        val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata", "root", "123456")
        //3、编写插入数据的sql
        val stat: PreparedStatement = con.prepareStatement("insert into card_avg_speed_and_num(card,com_date,num,avg_speed) values(?,?,?,?)")

        //这里的foreach是迭代器的一个普通方法，不是一个算子
        iter.foreach {
          case (card: Long, num: Int, avgSpeed: Double) =>

            //设置参数
            stat.setLong(1, card)
            stat.setString(2, comDate)
            stat.setInt(3, num)
            stat.setDouble(4, avgSpeed)
            //插入数据
            stat.execute()
        }

        stat.close()
        con.close()
      })

    })


    avgSpeedAndNumDs.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }

}
